Skip to main content

Working With Topics

Starting with DXAPI version 5.5.42, full support for working with Topics has been introduced. This includes the ability to write data into topics and read data from them.

tip

Additional information about Topics can be found here

Working with topics in the C++ dxapi is closely tied to the topic schema. Therefore, it is highly recommended to use the Solution Generator to generate your code for reading and writing, ensuring compatibility with the topic's structure.

tip

Additional information about Solution Generator can be found here

Before proceeding with examples of reading and writing, ensure you have created a topic and generated a project capable of interacting with topics.

Creating a Topic

To create a topic, use Create Topic statement in DDL. For example:

CREATE TOPIC IF NOT EXISTS "trades-topic" (
ENUM "deltix.timebase.api.messages.AggressorSide" 'Aggressor Side' (
"BUY" = 0,
"SELL" = 1
);
ENUM "deltix.timebase.api.messages.MarketEventType" 'Market Event Type' (
"BID" = 0,
"OFFER" = 1,
"TRADE" = 2,
"INDEX_VALUE" = 3,
"OPENING_PRICE" = 4,
"CLOSING_PRICE" = 5,
"SETTLEMENT_PRICE" = 6,
"TRADING_SESSION_HIGH_PRICE" = 7,
"TRADING_SESSION_LOW_PRICE" = 8,
"TRADING_SESSION_VWAP_PRICE" = 9,
"IMBALANCE" = 10,
"TRADE_VOLUME" = 11,
"OPEN_INTEREST" = 12,
"COMPOSITE_UNDERLYING_PRICE" = 13,
"SIMULATED_SELL_PRICE" = 14,
"SIMULATED_BUY_PRICE" = 15,
"MARGIN_RATE" = 16,
"MID_PRICE" = 17,
"EMPTY_BOOK" = 18,
"SETTLE_HIGH_PRICE" = 19,
"SETTLE_LOW_PRICE" = 20,
"PRIOR_SETTLE_PRICE" = 21,
"SESSION_HIGH_BID" = 22,
"SESSION_LOW_OFFER" = 23,
"EARLY_PRICE" = 24,
"AUCTION_CLEARING_PRICE" = 25,
"SWAP_VALUE_FACTOR" = 26,
"VALUE_ADJ_LONG" = 27,
"CUMMULATIVE_VALUE_ADJ_LONG" = 28,
"DAILY_VALUE_ADJ_SHORT" = 29,
"CUMMULATIVE_VALUE_ADJ_SHORT" = 30,
"FIXING_PRICE" = 31,
"CASH_RATE" = 32,
"RECOVERY_RATE" = 33,
"RECOVERY_RATE_LONG" = 34,
"RECOVERY_RATE_SHORT" = 35
);
CLASS "deltix.timebase.api.messages.MarketMessage" (
"sequenceNumber" 'Sequence Number' INTEGER
)
AUXILIARY
NOT INSTANTIABLE;
CLASS "deltix.timebase.api.messages.TradeMessage" UNDER "deltix.timebase.api.messages.MarketMessage" (
"exchangeId" 'Exchange Code' VARCHAR ALPHANUMERIC (10),
"price" 'Price' FLOAT DECIMAL,
"size" 'Size' FLOAT DECIMAL,
"condition" 'Trade Condition' VARCHAR,
"aggressorSide" 'Aggressor Side' "deltix.timebase.api.messages.AggressorSide",
"netPriceChange" 'Net Price Change' FLOAT DECIMAL,
"eventType" 'Event Type' "deltix.timebase.api.messages.MarketEventType"
);
)
OPTIONS(COPY_STREAM = 'trades-topic-copy')

Generating a Project

To create a project, use the Solution Generator as shown below:

./solgen.sh -cpp -cpp.project-type vs2015 -cpp.WriteTopic -cpp.ReadTopic cpp.project.root /path/to/sample timebase.topic trades-topic timebase.url dxtick://localhost:8011

Upon execution, the Solution Generator will create a new C++ solution in the specified folder (/path/to/sample in this example). The generated solution can be opened and compiled directly within the Visual Studio IDE.

Writing to a Topic

Below is an example of writing to a topic, generated by the Solution Generator:

void writetopic() {
// Create TimeBase connection
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));

try {
// Open in read-write mode
db->open(false);

TopicDB & topicDB = db->getTopicDB();

// Create topic publisher
PublishingOptions opt;
std::string key = "trades-topic";
unique_ptr<TickDirectLoader> loader(topicDB.createPublisher(key, opt));
dx_trades_topic::Dx_Trades_topicTopicEncoder encoder(loader.get());

// Message for writing
dx_trades_topic::deltix::timebase::api::messages::Dx_TradeMessage dx_trademessage;

// Create instruments
auto instrument = loader->getInstrumentId(InstrumentIdentity(DxApi::InstrumentType::EQUITY, "SYMBOL"));

for (int sent = 0; sent < 10; ++sent) {
// Write fields of deltix.timebase.api.messages.TradeMessage:
dx_trademessage.clear();
dx_trademessage.timestamp = now_ns();
dx_trademessage.typeId = dx_trades_topic::Dx_TradeMessageType;
dx_trademessage.entityId = instrument;
dx_trademessage.setSequenceNumber(1);
dx_trademessage.setExchangeId("NY4");
dx_trademessage.setPrice(1.5);
dx_trademessage.setSize(1.5);
dx_trademessage.setNetPriceChange(1.5);
dx_trademessage.setAggressorSide(dx_trades_topic::deltix::timebase::api::messages::Dx_AggressorSideEnum::BUY);
dx_trademessage.setCondition("Any string");
dx_trademessage.setEventType(dx_trades_topic::deltix::timebase::api::messages::Dx_MarketEventTypeEnum::BID);

encoder.send(dx_trademessage);
cout << "[SENT]: " << dx_trademessage.toString() << endl;
}

// Close loader after send
loader->close();
} catch (exception &e) {
cout << e.what() << endl;
} catch (...) {
cout << "System exception" << endl;
}

db->close();
}

Reading from a Topic

Below is an example of reading data from a topic, generated by the Solution Generator:

void readtopic() {
// Create TimeBase connection
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));

try {
// Open TimeBase connection
db->open(false);

TopicDB & topicDB = db->getTopicDB();

// Create topic poller
ConsumerOptions opt;
unique_ptr<TickMessagePoller> messagePoller(topicDB.createPollingConsumer("trades-topic", opt));
dx_trades_topic::Dx_Trades_topicTopicDecoder decoder(messagePoller.get());

int read = 0;
MessageProcessor messageProcessor([&decoder](const InstrumentMessage& header) {
NativeMessage* message = decoder.decode(header);
cout << "[MSG]: " << message->toString() << endl;
});

while (!messagePoller->isAtEnd() && read < 10) {
read += messagePoller->processMessages(10, messageProcessor);
}

cout << "Read " << read << " messages" << endl;

// Close messagePoller after reading
messagePoller->close();
} catch (exception &e) {
cout << e.what() << endl;
} catch (...) {
cout << "System exception" << endl;
}

db->close();
}